Solutions/Oracle Cloud Infrastructure/Data Connectors/AzureFunctionOCILogs/main.py (163 lines of code) (raw):
import json
import oci
import os
import logging
import re
from base64 import b64decode
import azure.functions as func
import time
from .sentinel_connector import AzureSentinelConnector
logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.ERROR)
MessageEndpoint = os.environ['MessageEndpoint']
StreamOcid = os.environ['StreamOcid']
WORKSPACE_ID = os.environ['AzureSentinelWorkspaceId']
SHARED_KEY = os.environ['AzureSentinelSharedKey']
LOG_TYPE = 'OCI_Logs'
CURSOR_TYPE = os.getenv('CursorType', 'group')
MAX_SCRIPT_EXEC_TIME_MINUTES = 5
PARTITIONS = os.getenv('Partition',"0")
Message_Limit = os.getenv('Message_Limit',250)
limit = int(Message_Limit)
FIELD_SIZE_LIMIT_BYTES = 1000 * 32
LOG_ANALYTICS_URI = os.environ.get('logAnalyticsUri')
if not LOG_ANALYTICS_URI or str(LOG_ANALYTICS_URI).isspace():
LOG_ANALYTICS_URI = 'https://' + WORKSPACE_ID + '.ods.opinsights.azure.com'
pattern = r"https:\/\/([\w\-]+)\.ods\.opinsights\.azure.([a-zA-Z\.]+)$"
match = re.match(pattern, str(LOG_ANALYTICS_URI))
if not match:
raise Exception("Invalid Log Analytics Uri.")
def main(mytimer: func.TimerRequest):
logging.info('Function started.')
start_ts = int(time.time())
config = get_config()
oci.config.validate_config(config)
sentinel_connector = AzureSentinelConnector(LOG_ANALYTICS_URI, WORKSPACE_ID, SHARED_KEY, LOG_TYPE, queue_size=2000)
stream_client = oci.streaming.StreamClient(config, service_endpoint=MessageEndpoint)
if CURSOR_TYPE.lower() == 'group' :
cursor = get_cursor_by_group(stream_client, StreamOcid, "group1", "group1-instance1")
else :
cursor = get_cursor_by_partition(stream_client, StreamOcid, partition=PARTITIONS)
process_events(stream_client, StreamOcid, cursor, limit, sentinel_connector, start_ts)
logging.info(f'Function finished. Sent events {sentinel_connector.successfull_sent_events_number}.')
def parse_key(key_input):
try:
begin_line = re.search(r'-----BEGIN [A-Z ]+-----', key_input).group()
key_input = key_input.replace(begin_line, '')
end_line = re.search(r'-----END [A-Z ]+-----', key_input).group()
key_input = key_input.replace(end_line, '')
encr_lines = ''
proc_type_line = re.search(r'Proc-Type: [^ ]+', key_input)
if proc_type_line:
proc_type_line = proc_type_line.group()
dec_info_line = re.search(r'DEK-Info: [^ ]+', key_input).group()
encr_lines += proc_type_line + '\n'
encr_lines += dec_info_line + '\n'
key_input = key_input.replace(proc_type_line, '')
key_input = key_input.replace(dec_info_line, '')
body = key_input.strip().replace(' ', '\n')
res = ''
res += begin_line + '\n'
if encr_lines:
res += encr_lines + '\n'
res += body + '\n'
res += end_line
except Exception:
raise Exception('Error while reading private key.')
return res
def get_config():
config = {
"user": os.environ['user'],
"key_content": parse_key(os.environ['key_content']),
"pass_phrase": os.environ.get('pass_phrase', ''),
"fingerprint": os.environ['fingerprint'],
"tenancy": os.environ['tenancy'],
"region": os.environ['region']
}
return config
def get_cursor_by_group(sc, sid, group_name, instance_name):
logging.info("Creating a cursor for group {}, instance {}".format(group_name, instance_name))
cursor_details = oci.streaming.models.CreateGroupCursorDetails(group_name=group_name, instance_name=instance_name,
type=oci.streaming.models.
CreateGroupCursorDetails.TYPE_TRIM_HORIZON,
commit_on_get=True)
response = sc.create_group_cursor(sid, cursor_details)
return response.data.value
def get_cursor_by_partition(client, stream_id, partition):
logging.info("Creating a cursor for partition {}".format(partition))
cursor_details = oci.streaming.models.CreateCursorDetails(
partition=partition,
type=oci.streaming.models.CreateCursorDetails.TYPE_TRIM_HORIZON)
response = client.create_cursor(stream_id, cursor_details)
cursor = response.data.value
return cursor
def check_size(queue):
data_bytes_len = len(json.dumps(queue).encode())
return data_bytes_len < FIELD_SIZE_LIMIT_BYTES
def split_big_request(queue):
if check_size(queue):
return [queue]
else:
middle = int(len(queue) / 2)
queues_list = [queue[:middle], queue[middle:]]
return split_big_request(queues_list[0]) + split_big_request(queues_list[1])
def process_large_field(event_section, field_name, field_size_limit,max_part=10):
"""Process and split large fields in the event data if they exceed the size limit."""
if field_name in event_section:
field_data = event_section[field_name]
if len(json.dumps(field_data).encode()) > field_size_limit:
# Split if field_data is a list
if isinstance(field_data, list):
queue_list = split_big_request(field_data)
for count, item in enumerate(queue_list, 1):
if count > max_part:
break
event_section[f"{field_name}Part{count}"] = item
del event_section[field_name]
# Split if field_data is a dictionary
elif isinstance(field_data, dict):
queue_list = list(field_data.keys())
for count, key in enumerate(queue_list, 1):
if count > max_part:
break
event_section[f"{field_name}Part{key}"] = field_data[key]
del event_section[field_name]
else:
pass
else:
# If within size limit, just serialize it
event_section[field_name] = json.dumps(field_data)
def process_events(client: oci.streaming.StreamClient, stream_id, initial_cursor, limit, sentinel: AzureSentinelConnector, start_ts):
cursor = initial_cursor
while True:
get_response = client.get_messages(stream_id, cursor, limit=limit, retry_strategy=oci.retry.DEFAULT_RETRY_STRATEGY)
if not get_response.data:
return
for message in get_response.data:
if message:
event = b64decode(message.value.encode()).decode()
logging.info('event details {}'.format(event))
myjson = str(event)
if(myjson.startswith("{")):
#if event != 'ok' and event != 'Test':
event = json.loads(event)
if "data" in event:
if "request" in event["data"] and event["type"] != "com.oraclecloud.loadbalancer.access":
if event["data"]["request"] is not None:
# Process "headers" and "parameters" in "request"
if "headers" in event["data"]["request"]:
process_large_field(event["data"]["request"], "headers", FIELD_SIZE_LIMIT_BYTES)
if "parameters" in event["data"]["request"]:
process_large_field(event["data"]["request"], "parameters", FIELD_SIZE_LIMIT_BYTES)
if "response" in event["data"] and event["data"]["response"] is not None:
# Process "headers" in "response"
if "headers" in event["data"]["response"]:
process_large_field(event["data"]["response"], "headers", FIELD_SIZE_LIMIT_BYTES)
if "additionalDetails" in event["data"]:
process_large_field(event["data"], "additionalDetails", FIELD_SIZE_LIMIT_BYTES)
if "stateChange" in event["data"] and event["data"]["stateChange"] is not None:
# Process "current" in "stateChange"
if "current" in event["data"]["stateChange"]:
process_large_field(event["data"]["stateChange"], "current", FIELD_SIZE_LIMIT_BYTES)
sentinel.send(event)
sentinel.flush()
if check_if_script_runs_too_long(start_ts):
logging.info('Script is running too long. Saving progress and exit.')
break
cursor = get_response.headers["opc-next-cursor"]
def check_if_script_runs_too_long(start_ts):
now = int(time.time())
duration = now - start_ts
max_duration = int(MAX_SCRIPT_EXEC_TIME_MINUTES * 60 * 0.85)
return duration > max_duration